{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# 本例只是作为算法示例，生产环境中不建议使用，因为这种排序的代价极大，特别是在大数据环境下。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "import pyspark,random,datetime"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "d = datetime.datetime.strptime('2017-01-01 00:00:00',\n",
    "                               '%Y-%m-%d %H:%M:%S')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "ls = [(random.choice([\"info\",\"warning\",\"error\"]),\n",
    "      d + datetime.timedelta(seconds=random.randint(0,1000000)))\n",
    "     for i in range(10000)]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {
    "scrolled": true
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[('warning', datetime.datetime(2017, 1, 11, 6, 5, 27)),\n",
       " ('info', datetime.datetime(2017, 1, 6, 16, 35, 31)),\n",
       " ('info', datetime.datetime(2017, 1, 4, 18, 29, 55)),\n",
       " ('info', datetime.datetime(2017, 1, 4, 21, 14, 8))]"
      ]
     },
     "execution_count": 4,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "ls[1:5]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [],
   "source": [
    "sc = pyspark.SparkContext()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [],
   "source": [
    "rdd = sc.parallelize(ls).cache()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[('error', datetime.datetime(2017, 1, 1, 18, 20, 56)),\n",
       " ('warning', datetime.datetime(2017, 1, 11, 6, 5, 27)),\n",
       " ('info', datetime.datetime(2017, 1, 6, 16, 35, 31)),\n",
       " ('info', datetime.datetime(2017, 1, 4, 18, 29, 55)),\n",
       " ('info', datetime.datetime(2017, 1, 4, 21, 14, 8))]"
      ]
     },
     "execution_count": 7,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "rdd.take(5)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [],
   "source": [
    "rdd2 = rdd.groupByKey()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 分组，数据如下："
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {
    "scrolled": true
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "info\n",
      "[datetime.datetime(2017, 1, 6, 16, 35, 31), datetime.datetime(2017, 1, 4, 18, 29, 55), datetime.datetime(2017, 1, 4, 21, 14, 8), datetime.datetime(2017, 1, 6, 23, 47, 1), datetime.datetime(2017, 1, 4, 19, 57, 31), datetime.datetime(2017, 1, 12, 6, 11, 57), datetime.datetime(2017, 1, 7, 13, 32, 47), datetime.datetime(2017, 1, 6, 12, 47, 49), datetime.datetime(2017, 1, 10, 16, 41, 53), datetime.datetime(2017, 1, 6, 0, 53, 26)]\n",
      "error\n",
      "[datetime.datetime(2017, 1, 1, 18, 20, 56), datetime.datetime(2017, 1, 5, 8, 47, 43), datetime.datetime(2017, 1, 11, 5, 0, 27), datetime.datetime(2017, 1, 4, 9, 9, 10), datetime.datetime(2017, 1, 4, 18, 31, 10), datetime.datetime(2017, 1, 10, 16, 55, 1), datetime.datetime(2017, 1, 11, 20, 0, 36), datetime.datetime(2017, 1, 1, 17, 56, 23), datetime.datetime(2017, 1, 2, 5, 45, 22), datetime.datetime(2017, 1, 8, 6, 8, 42)]\n",
      "warning\n",
      "[datetime.datetime(2017, 1, 11, 6, 5, 27), datetime.datetime(2017, 1, 2, 13, 35, 8), datetime.datetime(2017, 1, 8, 12, 37, 24), datetime.datetime(2017, 1, 6, 15, 20, 42), datetime.datetime(2017, 1, 8, 15, 52, 2), datetime.datetime(2017, 1, 12, 8, 58, 12), datetime.datetime(2017, 1, 4, 20, 56, 43), datetime.datetime(2017, 1, 8, 0, 48, 18), datetime.datetime(2017, 1, 11, 9, 3, 18), datetime.datetime(2017, 1, 11, 19, 16, 18)]\n"
     ]
    }
   ],
   "source": [
    "for t in rdd2.collect():\n",
    "    print(t[0])\n",
    "    flag = 0\n",
    "    print(list(t[1])[:10])"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 定义一个map方法，进行排序，并且取出前10条"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 再次强调，在大数据环境下，这种方式无论是性能还是效果都很不实用"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [],
   "source": [
    "def mymap(x):\n",
    "    key = x[0]\n",
    "    value = sorted(list(x[1]))[0:10]\n",
    "    return (key,value)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[('info',\n",
       "  [datetime.datetime(2017, 1, 1, 0, 4, 56),\n",
       "   datetime.datetime(2017, 1, 1, 0, 11, 43),\n",
       "   datetime.datetime(2017, 1, 1, 0, 12, 36),\n",
       "   datetime.datetime(2017, 1, 1, 0, 13, 35),\n",
       "   datetime.datetime(2017, 1, 1, 0, 22, 40),\n",
       "   datetime.datetime(2017, 1, 1, 0, 24, 20),\n",
       "   datetime.datetime(2017, 1, 1, 0, 30, 31),\n",
       "   datetime.datetime(2017, 1, 1, 0, 40, 23),\n",
       "   datetime.datetime(2017, 1, 1, 0, 46, 8),\n",
       "   datetime.datetime(2017, 1, 1, 0, 46, 50)]),\n",
       " ('error',\n",
       "  [datetime.datetime(2017, 1, 1, 0, 1, 31),\n",
       "   datetime.datetime(2017, 1, 1, 0, 7, 21),\n",
       "   datetime.datetime(2017, 1, 1, 0, 9, 19),\n",
       "   datetime.datetime(2017, 1, 1, 0, 11, 56),\n",
       "   datetime.datetime(2017, 1, 1, 0, 23, 38),\n",
       "   datetime.datetime(2017, 1, 1, 0, 39, 26),\n",
       "   datetime.datetime(2017, 1, 1, 0, 44, 17),\n",
       "   datetime.datetime(2017, 1, 1, 0, 47, 15),\n",
       "   datetime.datetime(2017, 1, 1, 0, 53, 58),\n",
       "   datetime.datetime(2017, 1, 1, 0, 54, 9)]),\n",
       " ('warning',\n",
       "  [datetime.datetime(2017, 1, 1, 0, 0, 37),\n",
       "   datetime.datetime(2017, 1, 1, 0, 13, 7),\n",
       "   datetime.datetime(2017, 1, 1, 0, 15, 33),\n",
       "   datetime.datetime(2017, 1, 1, 0, 24, 57),\n",
       "   datetime.datetime(2017, 1, 1, 0, 27, 28),\n",
       "   datetime.datetime(2017, 1, 1, 0, 31, 12),\n",
       "   datetime.datetime(2017, 1, 1, 0, 38, 18),\n",
       "   datetime.datetime(2017, 1, 1, 0, 43, 56),\n",
       "   datetime.datetime(2017, 1, 1, 0, 59),\n",
       "   datetime.datetime(2017, 1, 1, 0, 59, 6)])]"
      ]
     },
     "execution_count": 11,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "rdd2.map(lambda x:mymap(x)).collect()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 以上场景，海量数据情况下，是使用filter + sortBy算子"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[('info', datetime.datetime(2017, 1, 1, 0, 4, 56)),\n",
       " ('info', datetime.datetime(2017, 1, 1, 0, 11, 43)),\n",
       " ('info', datetime.datetime(2017, 1, 1, 0, 12, 36)),\n",
       " ('info', datetime.datetime(2017, 1, 1, 0, 13, 35)),\n",
       " ('info', datetime.datetime(2017, 1, 1, 0, 22, 40)),\n",
       " ('info', datetime.datetime(2017, 1, 1, 0, 24, 20)),\n",
       " ('info', datetime.datetime(2017, 1, 1, 0, 30, 31)),\n",
       " ('info', datetime.datetime(2017, 1, 1, 0, 40, 23)),\n",
       " ('info', datetime.datetime(2017, 1, 1, 0, 46, 8)),\n",
       " ('info', datetime.datetime(2017, 1, 1, 0, 46, 50))]"
      ]
     },
     "execution_count": 13,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "rdd.filter(lambda x :(x[0] ==\"info\"))\\\n",
    ".sortBy(lambda x : (x[1],x)).take(10)"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.6"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
